-
Notifications
You must be signed in to change notification settings - Fork 88
chore: load network accounts asynchronously in NTX Builder #1495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: next
Are you sure you want to change the base?
Conversation
53330ae to
5267cbf
Compare
crates/ntx-builder/src/builder.rs
Outdated
| tokio::spawn(async move { | ||
| if let Err(err) = account_loader_store.stream_network_account_ids(account_tx).await { | ||
| tracing::error!(%err, "failed to load network accounts from store"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another question is whether we should be reacting to task failure here. As in, should we abort the ntx builder (or restart the task) if it fails? Or is logging a single error good enough.
Failure here would mean that we never load accounts with existing notes from the store unless they also get a new event coming in. So I think we should abort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went for the abort approach
crates/ntx-builder/src/store.rs
Outdated
| Err(err) => { | ||
| // Exponential backoff with base 500ms and max 30s. | ||
| let backoff = Duration::from_millis(500) | ||
| .saturating_mul(1 << retry_counter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will fail/rollover after.. 32 or 64 retries 😁 Also need to saturating exponential (or limit counter I guess).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed this here, and in some other places where we were using this pattern
crates/ntx-builder/src/store.rs
Outdated
| /// This method is designed to be run in a background task, sending accounts to the main event | ||
| /// loop as they are loaded. This allows the ntx-builder to start processing mempool events | ||
| /// without waiting for all accounts to be preloaded. | ||
| #[instrument(target = COMPONENT, name = "store.client.load_committed_accounts", skip_all, err)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bit of a problem with the instrumentation as is. One needs to be careful with long-running tasks and traces. This is because a trace is only complete once the root span completes.
In this case, what we'll have is:
start load_committed_accounts
fetch_page(0)
submit_page(0)
fetch_page(1)
submit_page(1)
...
...
... a long, long, long time later
fetch_page(chain tip)
submit_page(chain tip)
close load_committed_accounts
Instead we shouldn't instrument this method at all, and each iteration of the loop should be its own root span. This means you'll want to reshuffle things a bit.
See here for an example with retries. The first example is what we have per each loop iteration. Let me know if this is still ambiguous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, perfect. I will check the network-monitor for this too and open an issue if I find something.
| for account in accounts { | ||
| // If the receiver is dropped, stop loading. | ||
| if sender.send(account).await.is_err() { | ||
| return Ok(()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth an error/warn log here
| } | ||
| } => { | ||
| account_loader = None; | ||
| result.context("account loader task panicked")??; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have flatten from an error extension trait
| result.context("account loader task panicked")??; | |
| result.context("account loader task panicked").flatten()?; |
| Some(result) = async { | ||
| match account_loader.as_mut() { | ||
| Some(handle) => Some(handle.await), | ||
| None => std::future::pending().await, | ||
| } | ||
| } => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative to Option<Handle> is to reassign the handle itself upon completion, something like
let mut account_loader_handle = tokio::spawn(...);
// ...
select! {
result = account_loader_handle => {
result
.context("panicked while loading accounts from store")
.flatten()?;
tracing::info!("account loading from store completed");
account_loader_handle = std::future::pending();
}
}| tracing::Span::current() | ||
| .record("chain_tip", pagination_info.chain_tip) | ||
| .record("current_height", pagination_info.block_num); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the info for the next page or no? I would ignore this here since it will get recorded with its page on the next loop.
| skip(self, accounts, sender), | ||
| fields(chain_tip = chain_tip, current_height = current_height) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its okay to log this at the root span only
| skip(self, accounts, sender), | |
| fields(chain_tip = chain_tip, current_height = current_height) | |
| skip_all |
| block_range: RangeInclusive<BlockNumber>, | ||
| sender: &tokio::sync::mpsc::Sender<NetworkAccountPrefix>, | ||
| ) -> Result<Option<BlockNumber>, StoreError> { | ||
| let (accounts, pagination_info) = self.fetch_page(block_range).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately tracing will only log the error's Display impl, while we usually want the full error report.
So instead we have to do this manually :*(
And the same for the error lower down.
| let (accounts, pagination_info) = self.fetch_page(block_range).await?; | |
| let (accounts, pagination_info) = self.fetch_page(block_range) | |
| .await | |
| .inspect_err(|err| tracing::Span::curret().set_error(err))?; |
A somewhat better way (to prevent missing this) is to construct an async chain so that its all handled in one location, an example
miden-node/crates/block-producer/src/block_builder/mod.rs
Lines 136 to 155 in 2cc9742
| self.get_block_inputs(selected) | |
| .inspect_ok(BlockBatchesAndInputs::inject_telemetry) | |
| .and_then(|inputs| self.propose_block(inputs)) | |
| .inspect_ok(|(proposed_block, _)| { | |
| ProposedBlock::inject_telemetry(proposed_block); | |
| }) | |
| .and_then(|(proposed_block, inputs)| self.validate_block(proposed_block, inputs)) | |
| .and_then(|(proposed_block, inputs, header, signature, body)| self.prove_block(proposed_block, inputs, header, signature, body)) | |
| .inspect_ok(ProvenBlock::inject_telemetry) | |
| // Failure must be injected before the final pipeline stage i.e. before commit is called. The system cannot | |
| // handle errors after it considers the process complete (which makes sense). | |
| .and_then(|proven_block| async { self.inject_failure(proven_block) }) | |
| .and_then(|proven_block| self.commit_block(mempool, proven_block)) | |
| // Handle errors by propagating the error to the root span and rolling back the block. | |
| .inspect_err(|err| Span::current().set_error(err)) | |
| .or_else(|err| async { | |
| self.rollback_block(mempool, block_num).await; | |
| Err(err) | |
| }) | |
| .await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though this may not be a good fit depending on how data is broken up.
closes #1487
depends on #1453 and #1501 to fully work.